#!/bin/bash

# parameter 1: work directory
# parameter 2: sink_uri
# parameter 3: consumer_replica_config
# parameter 4: log suffix

set -e

workdir=$1
sink_uri=$2
consumer_replica_config=$3
schema_registry_uri=$4
log_suffix=$5
pwd=$pwd

echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>"
cd $workdir

# Because there is no transaction concept in Kafka,
# we need to set `batch-dml-enable` to false to avoid data inconsistency.
downstream_uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false"

# some consumer may require `consumer_replica_config`, set it separately
if [ "$consumer_replica_config" != "" ] && [ "$schema_registry_uri" != "" ]; then
	echo "consumer replica config found: $consumer_replica_config"
	echo "schema registry uri found: $schema_registry_uri"
	cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} --config $consumer_replica_config --schema-registry-uri $schema_registry_uri >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
elif [ "$schema_registry_uri" != "" ]; then
	echo "schema registry uri found: $schema_registry_uri"
	cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} --schema-registry-uri $schema_registry_uri >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
elif [ "$consumer_replica_config" != "" ]; then
	echo "consumer replica config found: $consumer_replica_config"
	cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
else
	cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
fi

cd $pwd
